package org.zstacks.znet.pool;

import java.io.IOException;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.zstacks.znet.RemotingClient;
import org.zstacks.znet.nio.Dispatcher;

public class RemotingClientPool extends GenericObjectPool<RemotingClient>{ 
	private Dispatcher dispatcher = null;
	private boolean ownDispatcher = false;
	
	public RemotingClientPool(RemotingClientPoolConfig config) throws IOException{
		super(new RemotingClientFactory(config), config);
		
		RemotingClientFactory factory = (RemotingClientFactory)getFactory();
		this.dispatcher = config.getDispatcher();
		if(this.dispatcher == null){
			this.dispatcher = new Dispatcher()
				.selectorCount(config.getSelectorCount())
				.executorCount(config.getExecutorCount());
			
			this.ownDispatcher = true;
		}
		this.dispatcher.start();
		
		factory.dispatcher = this.dispatcher;
	}  

	public Dispatcher getDispatcher(){
		return this.dispatcher;
	}
	
	@Override
	public void close() {  
		super.close();  
		try {
			if(ownDispatcher && this.dispatcher != null){
				this.dispatcher.close();
			}
		} catch (IOException e) {
			//ignore
		}
	}

}

class RemotingClientFactory extends BasePooledObjectFactory<RemotingClient> {
	Dispatcher dispatcher;
	private final String broker; 
	
	public RemotingClientFactory(RemotingClientPoolConfig config) throws IOException{
		this.broker = config.getBrokerAddress();
	}

	@Override
	public RemotingClient create() throws Exception { 
		return new RemotingClient(broker, dispatcher); 
	}

	@Override
	public PooledObject<RemotingClient> wrap(RemotingClient obj) { 
		return new DefaultPooledObject<RemotingClient>(obj);
	} 
	
	@Override
	public void destroyObject(PooledObject<RemotingClient> p) throws Exception {
		RemotingClient client = p.getObject();
		client.close();
	}
	
	@Override
	public boolean validateObject(PooledObject<RemotingClient> p) {
		return p.getObject().hasConnected();
	}
}